Disruptor在撮合引擎的实践|得物技术
目录
一、Disruptor的简介
1. Disruptor的使用场景
2. Disruptor和ArrayBlockingQueue性能对比
3. Disruptor快速接入指南
4. Disruptor消费者等待策略
5. Disruptor灵活的消费者模式
二、Disruptor的核心概念
1. Disruptor内部组件交互图
2. 核心概念
三、Disruptor的特点
1. 环形数组结构
2. 无锁化设计
3. 独占缓存行的方式消除伪共享
4. 预分配内存
四、Disruptor在撮合引擎中的应用
1. 数字货币交易系统的简介
2. 撮合引擎流程图
3. 撮合引擎之Disruptor代码
五、总结
一
Disruptor的简介
Disruptor的使用场景
加密货币交易撮合引擎 Log4j2基于Disruptor实现的异步日志处理 Canal+Disruptor实现高效的数据同步 知名开源框架Apache Strom
2010年在QCon的演讲,介绍了基于Disruptor开发的系统单线程能支撑每秒600万订单,由此可见该组件可以大幅提升系统的TPS,所以对于一些需要大幅提升单机应用的吞吐量的场景可以考虑使用Disruptor。
Disruptor和ArrayBlockingQueue性能对比
ArrayBlockingQueue是基于数组ArrayList实现的,通过ReentrantLock独占锁保证线程安全; Disruptor是基于环形数组队列RingBuffer实现的,通过CAS乐观锁保证线程安全。在多种生产者-消费者模式下的性能对比。
Figure 1. Unicast: 1P–1C
Figure 2. Three Step Pipeline: 1P–3C
Figure 3. Sequencer: 3P–1C
Figure 4. Multicast: 1P–3C
Figure 5. Diamond: 1P–3C
Disruptor快速接入指南
引入Maven依赖
自定义事件和事件工厂
public class LongEvent {
private long value;
public void set(long value) {
this.value = value;
}
@Override
public String toString() {
return "LongEvent{" + "value=" + value + '}';
}
}
public class LongEventFactory implements EventFactory<LongEvent> {
@Override
public LongEvent newInstance() {
return new LongEvent();
}
}
定义事件处理器,即消费者
public class LongEventHandler implements EventHandler<LongEvent> {
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
System.out.println("Event: " + event);
}
}
定义事件生产者
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.examples.longevent.LongEvent;
import java.nio.ByteBuffer;
public class LongEventProducer {
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer bb) {
long sequence = ringBuffer.next();
try {
LongEvent event = ringBuffer.get(sequence);
event.set(bb.getLong(0));
}
finally {
ringBuffer.publish(sequence);
}
}
}
编写启动类
public class LongEventMain {
public static void main(String[] args) throws InterruptedException {
// 消费者线程池
Executor executor = Executors.newCachedThreadPool();
// 事件工厂
LongEventFactory eventFactory = new LongEventFactory();
// 指定RingBuffer大小
int bufferSize = 1024;
// 构造事件分发器
Disruptor<LongEvent> disruptor = new Disruptor<>(eventFactory
, bufferSize
, executor
, ProducerType.SINGLE // 1.ProducerType.SINGLE 单生产者模式 2.ProducerType.MULTI 多生产者模式
, new YieldingWaitStrategy());//消费者等待策略
// 注册消费者
disruptor.handleEventsWith(new LongEventHandler());
// 启动事件分发
disruptor.start();
// 获取RingBuffer 用于生产事件
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long i=0;true; i++) {
bb.putLong(0, i);
// 发送事件
producer.onData(bb);
Thread.sleep(1000);
}
}
}
Disruptor消费者等待策略
Disruptor灵活的消费者模式
支持单生产者和多生产者
单消费者
//注册单个消费者
disruptor.handleEventsWith(new LongEventHandler());
多消费者:并行的、广播模式
//注册多个消费者
disruptor.handleEventsWith(new LongEventHandler()
, new LongEventHandler1()
, new LongEventHandler2());
多消费者:并行的、消费者组模式
消费者需要实现WorkHandler接口,而不是 EventHandler 接口; 使用handleEventsWithWorkerPool设置Disruptor的消费者,而不是handleEventsWith方法。
public class LongWorkHandler implements WorkHandler<LongEvent> {
@Override
public void onEvent(LongEvent longEvent) throws Exception {
System.out.println("Event: " + logEvent);
}
}
public class OtherWorkHandler implements WorkHandler<LongEvent> {
@Override
public void onEvent(LongEvent longEvent) throws Exception {
System.out.println("Event: " + logEvent);
}
}
//注册消费者组
disruptor.handleEventsWithWorkerPool(new LongWorkHandler()
, new LongWorkHandler()
, new LongWorkHandler());
多个消费者组之间并行模式
//注册消费者组1
disruptor.handleEventsWithWorkerPool(new LongWorkHandler()
, new LongWorkHandler()
, new LongWorkHandler());
//注册消费者组2
disruptor.handleEventsWithWorkerPool(new OtherWorkHandler()
, new OtherWorkHandler()
, new OtherWorkHandler());
多个消费者组之间航道执行模式
//注册消费者
disruptor.handleEventsWithWorkerPool(new LongWorkHandler(), new LongWorkHandler(), new LongWorkHandler())
.thenHandleEventsWithWorkerPool(new OtherWorkHandler(), new OtherWorkHandler(), new OtherWorkHandler());
多消费者:链式、菱形、六边形执行模式
//链式
disruptor.handleEventsWith(new LongEventHandler11()).then(new LongEventHandler12());
disruptor.handleEventsWith(new LongEventHandler21()).then(new LongEventHandler22());
//菱形
disruptor.handleEventsWith(new LongEventHandler1(), new LongEventHandler2())
.then(new LongEventHandler3());
//六边形
LongEventHandler handler11 = new LongEventHandler();
LongEventHandler handler12 = new LongEventHandler();
LongEventHandler handler21 = new LongEventHandler();
LongEventHandler handler22 = new LongEventHandler();
LongEventHandler handler3 = new LongEventHandler();
disruptor.handleEventsWith(handler11, handler21);
disruptor.after(handler11).handleEventsWith(handler12);
disruptor.after(handler21).handleEventsWith(handler22);
disruptor.after(handler12, handler22).handleEventsWith(handler3);
二
Disruptor的核心概念
Disruptor内部组件交互图
核心概念
Sequence
Sequencer
SequenceBarrier(序列屏障)
三
Disruptor的特点
环形数组结构
采用首尾相接的数组而非链表,无需担心index溢出问题,且数组对处理器的缓存机制更加友好; 在RingBuffer数组长度设置为2^N时,通过sequence & (bufferSize-1)加速定位元素实际下标索引,通过结合左移(<<)操作实现乘法; 结合SequenceBarrier机制,实现线程与线程之间高效的数据交互。
无锁化设计
独占缓存行的方式消除伪共享
什么是伪共享
一个缓存行可以存储多个变量(存满当前缓存行的字节数);64个字节可以放8个long,16个int; 而CPU对缓存的修改又是以缓存行为最小单位的;不是以long 、byte这样的数据类型为单位的; 在多线程情况下,如果需要修改“共享同一个缓存行的其中一个变量”,该行中其他变量的状态就会失效,甚至进行一致性保护。
Disruptor是如何解决伪共享的
预分配内存
见com.lmax.disruptor.RingBuffer.fill(EventFactoryeventFactory)
四
Disruptor在撮合引擎中的应用
数字货币交易系统的简介
背景&价值
C端核心界面
以上截图仅用于技术展示,不构成投资建议
交易系统简化交互图
撮合应用的特点
纯内存的、CPU密集型的
有状态的
撮合引擎流程图
撮合引擎之Disruptor代码
定义事件:用户交易单
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DisruptorEvent implements Serializable {
private static final long serialVersionUID = -5886259612924517631L;
//成交单
private EntrustOrder entrustOrder;
}
定义事件处理器:对用户买单和卖单进行撮合匹配
//撮合事件处理器
public class ResultsHandler implements EventHandler<DisruptorEvent> {
private final Set<Integer> symbolIdSet = new HashSet<>();
private int workerQueueSize;
public ResultsHandler(Set<Integer> symbolIdSet, int queueSize) {
this.symbolIdSet.addAll(symbolIdSet);
this.workerQueueSize = queueSize;
}
@Override
public void onEvent(DisruptorEvent disruptorEvent, long sequence, boolean endOfBatch) {
try {
//获取订单
EntrustOrder entrustOrder = disruptorEvent.getEntrustOrder();
//常规的撮合,正常撤单,异常撤单
if (OperationTypeEnum.MATCH.getCode() == entrustOrder.getOperationType() ||
OperationTypeEnum.CANCEL.getCode() == entrustOrder.getOperationType()) {
// 取消订单需要在引擎内处理
if (Objects.equals(entrustOrder.getOperationType(), OperationTypeEnum.MATCH.getCode())) {
//更新为处理中
OrderBook.addToOrderBook(entrustOrder.getOrderId(), MatchStatusEnum.MATCH_ING);
} else if (Objects.equals(entrustOrder.getOperationType(), OperationTypeEnum.CANCEL.getCode())) {
//更新为处理中
if (OrderBook.getByOrderId(entrustOrder.getOrderId()) != null) {
OrderBook.addToOrderBook(entrustOrder.getOrderId(), MatchStatusEnum.CANCEL_ING);
}
}
// 执行撮合
this.doMatch(entrustOrder);
}
} catch (Exception e) {
log.error("match disruptor event handler error:{}", e.getMessage(), e);
}
}
/**
* 根据规则选择不同的撮合策略算法,进行撮合处理
* @param takerOrder
*/
public void doMatch(EntrustOrder takerOrder) {
SideEnum sideEnum = SideEnum.getSideEnum(takerOrder.getSide());
OrderTypeEnum orderTypeEnum = OrderTypeEnum.getOrderTypeEnum(takerOrder.getOrderType());
//选择撮合策略
MatchService matchService = MatchStrategy.router(orderTypeEnum, sideEnum);
MatchContext matchContext = MatchContext.getContext();
matchContext.setTakerOrder(takerOrder);
//执行撮合
matchService.start(matchContext);
//撮合完成
matchService.stop(matchContext);
}
}
事件生产者:构建Disruptor、生产事件
/**
* disruptor启动入口类,当系统读取到当前机器需要为哪些交易对提供服务的时候,
* 我们需要为这些交易对进行分组服务,哪些交易对放到同一个disruptor中
* 通过分组,一方面确保了活跃度高的交易对能够最大程度的利用资源,另一方面活跃度低的交易对能够有效处理,
* 同时降低了cpu暴涨的风险
*/
@Service
@Slf4j
public class ExchangeLauncher {
private static int BUFFER_SIZE = 1024 * 16;
@Resource
private LimitBuyMatchService limitBuyMatchService;
@Resource
private LimitSellMatchService limitSellMatchService;
@Resource
private MarketBuyMatchService marketBuyMatchService;
@Resource
private MarketSellMatchService marketSellMatchService;
@Resource
private MatchClusterConfiguration matchClusterConfiguration;
@Value("${match.worker-queue-size:5}")
private int workSize;
//一个交易对对应一个disruptor
private Map<Integer, ExchangeCore> exchangeCoreMap = new ConcurrentHashMap<>();
private List<ExchangeCore> exchangeCoreList = new CopyOnWriteArrayList<>();
public void start() {
try {
//init order book
OrderBook.init();
Set<Integer> symbolIdListSet = matchClusterConfiguration.getMasterSymbolIdSet();
if (CollectionUtils.isNotEmpty(symbolIdListSet)) {
List<Integer> allSymbolIds = new ArrayList<>(symbolIdListSet);
List<List<Integer>> pageList = ListUtils.partition(allSymbolIds, workSize);
pageList.forEach(symbolIds -> {
ResultsHandler handler = new ResultsHandler(new HashSet<>(symbolIds), workSize);
ExchangeCore exchangeCore = new ExchangeCore(handler, BUFFER_SIZE, new NamedThreadFactory("match", false));
exchangeCore.start();
exchangeCoreList.add(exchangeCore);
symbolIds.forEach(symbolId -> exchangeCoreMap.put(symbolId, exchangeCore));
});
}
// 注册matchService子类
registerMatchServices();
} catch (Exception e) {
log.error("exchangeLauncher start error:{}", e.getMessage(), e);
}
}
private void registerMatchServices() {
MatchStrategy.register(OrderTypeEnum.LIMIT, SideEnum.BUY, limitBuyMatchService);
MatchStrategy.register(OrderTypeEnum.LIMIT, SideEnum.SELL, limitSellMatchService);
MatchStrategy.register(OrderTypeEnum.MARKET, SideEnum.BUY, marketBuyMatchService);
MatchStrategy.register(OrderTypeEnum.MARKET, SideEnum.SELL, marketSellMatchService);
}
}
public class ExchangeCore extends AbstractLifeCycle {
private final Disruptor<DisruptorEvent> disruptor;
private MatchEventPublisher publisher;
private ResultsHandler eventHandler;
public ExchangeCore(ResultsHandler matchHandler, int ringBufferSize, ThreadFactory threadFactory) {
EventFactory eventFactory = () -> new DisruptorEvent();
this.disruptor = new Disruptor<>(eventFactory, ringBufferSize, threadFactory);
publisher = new MatchEventPublisher(this.disruptor);
disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());
this.eventHandler = matchHandler;
disruptor.handleEventsWith(eventHandler);
disruptor.start();
}
@Override
public void start() {
super.start();
}
@Override
public void stop() {
super.stop();
disruptor.shutdown();
}
public BaseResponse doMatch(EntrustOrder taker) {
// 前置处理----start
if (OrderTypeEnum.getOrderTypeEnum(taker.getOrderType()) == null || SideEnum.getSideEnum(taker.getSide()) == null) {
log.error("{} - parameter error:{} or {}", taker.getTraceId(), "orderType", "side");
return BaseResponse.error(TradingMatchCodeEnum.PARAMETER_ERROR);
}
MatchStatusEnum matchStatusEnum = OrderBook.getByOrderId(taker.getOrderId());
MetricService metricService = SpringContextUtil.getBean(MetricService.class);
MatchClusterConfiguration configuration = SpringContextUtil.getBean(MatchClusterConfiguration.class);
// 撮合防重校验,并发存在问题。但是消费的时候,是单线程,做了校验,不存在重复撮合的问题。
if (OperationTypeEnum.MATCH.getCode() == taker.getOperationType()) {
if (matchStatusEnum != null) {
//短时间内重复撮合
log.error("{} - match repeat ,orderId :{}", taker.getTraceId(), taker.getOrderId());
return BaseResponse.error(TradingMatchCodeEnum.REPEAT_REQUEST);
}
//构造对象进入等待队列
OrderBook.addToOrderBook(taker.getOrderId(), MatchStatusEnum.WAIT_ING);
metricService.count(MetricNames.ORDER_TYPE_NUM, "type", "match", "group", configuration.getClusterName());
} else if (OperationTypeEnum.CANCEL.getCode() == taker.getOperationType()) {
int cancelType = taker.getCancelType();
/**
异常单-产生情况:收单服务 调用撮合 出现异常,不知道成功没,没有明确响应 开始进行异常撤单
*/
if (CancelTypeEnum.NORMAL_CANCEL.getCode() == cancelType) {
if (matchStatusEnum == null) {
// 数据有可能在请求队列中被逐出,需要继续走逻辑
//
} else {
if (MatchStatusEnum.MATCH_END == matchStatusEnum) {
//重复撤销,深度盘已经没有数据,没必要继续往下,不走disruptor 和撮合直接返回
log.error("{} - cancel failed, match end ,orderId :{}", taker.getTraceId(), taker.getOrderId());
return BaseResponse.error(TradingMatchCodeEnum.REPEAT_REQUEST);
}
OrderBook.addToOrderBook(taker.getOrderId(), MatchStatusEnum.WAIT_CANCEL);
}
} else {
// reload异常撤单,要加入内存
OrderBook.addToOrderBook(taker.getOrderId(), MatchStatusEnum.WAIT_CANCEL);
}
} else {
log.warn("--------can not find the operationType[{}]", taker.getOperationType());
throw new TradingMatchException("can not find the operationType[" + taker.getOperationType() + "]");
}
// 前置处理----end
//Disruptor开始发布事件
publisher.publish(taker);
return BaseResponse.success();
}
public Disruptor<DisruptorEvent> getDisruptor() {
return disruptor;
}
}
public class MatchEventPublisher {
private Disruptor<DisruptorEvent> disruptor;
public MatchEventPublisher(Disruptor<DisruptorEvent> disruptor) {
this.disruptor = disruptor;
}
private static final EventTranslatorOneArg<DisruptorEvent, EntrustOrder> TRANSLATOR =
(event, sequence, entrustOrder) -> {
event.setEntrustOrder(entrustOrder);
};
public void publish(EntrustOrder taker) {
RingBuffer<DisruptorEvent> ringBuffer = disruptor.getRingBuffer();
taker.setSequence(ringBuffer.getCursor());
taker.setArriveTime(System.currentTimeMillis());
ringBuffer.publishEvent(TRANSLATOR, taker);
// ...
}
}
五
总结
往期回顾
文 / 天佑
关注得物技术,每周一、三、五更新技术干货
要是觉得文章对你有帮助的话,欢迎评论转发点赞~
未经得物技术许可严禁转载,否则依法追究法律责任。“
扫码添加小助手微信
如有任何疑问,或想要了解更多技术资讯,请添加小助手微信:
快快点击下方图片报名吧!